package com.zhike.utils;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.zhike.constant.MysqlConstant;
import com.zhike.function.CDCDeserialization;

/**
 * @Author: zhike
 * @CreateTime: 2024/1/18
 * @Description:
 */
public class MysqlUtils {
    //创建MysqlSource
    public static MySqlSource<String> getMysqlSource(String table_list) {
        return MySqlSource.<String>builder()
                .hostname(MysqlConstant.hostname)
                .port(MysqlConstant.port)
                .username(MysqlConstant.username)
                .password(MysqlConstant.password)
                //设置mysql数据库
                .databaseList("car_data")
                //设置mysql表(多个用,分隔)
                .tableList(table_list)
                //设置cdc启动方式
                .startupOptions(StartupOptions.initial())
                //设置反序列化器
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();
    }
}
